package com.traffic.visim.kafkamq.register;

import cn.hutool.core.util.ObjectUtil;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;

/**
 * @description: 消息消费
 * @author: mipengchong
 * @create: 2019-10-31 15:56
 **/
@Slf4j
@Component
public class KafkaProducer {


    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;


    public void send(String topic, String data) {

        topic = this.getTopic(topic);
        kafkaTemplate.setDefaultTopic(topic);

        GenericMessage message = getGenericMessage(data,null);

        ListenableFuture<SendResult<String, String>> send = kafkaTemplate.send(message);
        String finalTopic1 = topic;
        send.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable ex) {
                log.error("发送消息失败: topic: {},error: {}", finalTopic1, ex.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, String> result) {
                log.info("发送结果: topic: {}", finalTopic1);
            }
        });
    }


    public void send(String topic, String key, String data) {
        topic = this.getTopic(topic);
        ListenableFuture<SendResult<String, String>> send = kafkaTemplate.send(topic,key,data);

        String finalTopic = topic;
        send.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable ex) {
                log.error("发送消息失败: topic: {},key: {} , error: {}", finalTopic, key, ex.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, String> result) {
                log.info("发送结果: topic: {},key:{}", finalTopic, key);
            }
        });
    }

    private GenericMessage getGenericMessage(String data,String key) {
        Map map = new HashMap();
        GenericMessage message = new GenericMessage(data,new MessageHeaders(map));
        return message;
    }


    /**
     * 发送partition
     *
     * @param topic
     * @param partition
     * @param key
     * @param data
     */
    public void send(String topic, Integer partition, String key, String data) {
        topic = this.getTopic(topic);
        ListenableFuture<SendResult<String, String>> send = kafkaTemplate.send(topic, partition, key, data);

        String finalTopic = topic;
        send.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable ex) {
                log.error("发送消息失败: topic: {},key: {} , error: {}", finalTopic, key, ex.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, String> result) {
                log.info("发送结果: topic: {},key:{}", finalTopic, key);
            }
        });
    }

    // TODO 如果需要有序 自己实现

    private String getTopic(String topic) {
        return topic;
    }

}

